time = datetime.strptime('2014-04-01T12:00:00.000Z', "%Y-%m-%dT%H:%M:%S.%f%z")
bbox = ge.QueryRectangle(
ge.BoundingBox2D(-180.0, -90.0, 180.0, 90.0),
ge.TimeInterval(time, time),
ge.SpatialResolution(0.1, 0.1),
)
# async def handler(websocket):
# consumer_task = asyncio.create_task(consumer_handler(websocket))
# producer_task = asyncio.create_task(producer_handler(websocket))
# done, pending = await asyncio.wait(
# [consumer_task, producer_task],
# return_when=asyncio.FIRST_COMPLETED,
# )
# for task in pending:
# task.cancel()
async def client(uri):
async with connect(
uri,
extra_headers=ge.get_session().auth_header,
) as websocket:
# await handler(websocket)
global_tile = None
while websocket.open:
try:
await websocket.send("NEXT")
arrow_ipc = await websocket.recv()
record_batch = read_arrow_ipc(arrow_ipc)
tile = create_xarray(record_batch)
# print(tile)
print("LOCAL")
tile.plot()
plt.show()
print("GLOBAL")
global_tile = merge_tiles(global_tile, tile)
global_tile.plot()
plt.show()
except Exception as e:
print("An error occurred:")
print(e)
if websocket.open:
await websocket.close()
def read_arrow_ipc(arrow_ipc):
reader = pa.ipc.open_file(arrow_ipc)
record_batch = reader.get_record_batch(0)
return record_batch
def create_xarray(record_batch):
metadata = record_batch.schema.metadata
spatial_partition: ge.api.SpatialPartition2D = json.loads(metadata[b'spatialPartition'])
x_size = int(metadata[b'xSize'])
y_size = int(metadata[b'ySize'])
arrow_array = record_batch.column(0)
xmin = spatial_partition['upperLeftCoordinate']['x']
xmax = spatial_partition['lowerRightCoordinate']['x']
ymin = spatial_partition['lowerRightCoordinate']['y']
ymax = spatial_partition['upperLeftCoordinate']['y']
return xr.DataArray(
arrow_array.to_numpy(zero_copy_only=False).reshape(x_size, y_size),
dims=["y", "x"],
coords={
'x': np.linspace(xmin, xmax, x_size, endpoint=False),
'y': np.linspace(ymax, ymin, y_size, endpoint=False),
'time': datetime.strptime('2014-04-01T12:00:00.000Z', "%Y-%m-%dT%H:%M:%S.%f%z"),
},
)
def merge_tiles(global_tile, tile):
if global_tile is None:
return tile
try:
# can be used as long as the coordinates are not yet in the dataset (e.g., as NaN)
return xr.combine_by_coords(
[global_tile, tile],
)
except:
# can be used to fill in the coordinates that were previously NaN
return global_tile.combine_first(tile)
ws_server_url = ge.get_session().server_url.replace("http", "ws")
url = f"{ws_server_url}/workflow/{workflow}/rasterStream?resultType=arrow&spatialBounds={bbox.bbox_str}&timeInterval={urllib.parse.quote(bbox.time_str)}&spatialResolution={bbox.spatial_resolution}"
print(url)
await client(url)